package com.atguigu.app.dws;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.UserRegisterBean;
import com.atguigu.func.MyWindowFunction;
import com.atguigu.uitl.ClickHouseUtil;
import com.atguigu.uitl.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author hjy
 * @create 2023/3/17 20:00
 */
public class Dws05_UserUserRegisterWindow {
    public static void main(String[] args) throws Exception {
        //todo 1 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall-flink/check");
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        env.setStateBackend(new HashMapStateBackend());
//        System.setProperty("HAOOP_USER_NAME","atguigu");
        //todo 2 从kafka（dwd_user_register）读数据
        String topic="dwd_user_register";
        String groupID="user_register_0926";
        DataStreamSource<String> kafkaDs = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupID));

        //todo 3 将数据转为JSONObject对象并获取到WaterMark
        SingleOutputStreamOperator<JSONObject> jsonObjWithWMDS = kafkaDs.map(JSONObject::parseObject)
                .assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(TimestampAssignerSupplier.of(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {

                                return element.getLong("create_time");
                            }
                        })));

        //todo 4 数据转为JavaBean对象
        SingleOutputStreamOperator<UserRegisterBean> javaBeanDS = jsonObjWithWMDS.map(new MapFunction<JSONObject, UserRegisterBean>() {
            @Override
            public UserRegisterBean map(JSONObject value) throws Exception {

                return new UserRegisterBean("", "", 1L, null);
            }
        });

        //todo 5 开窗聚合
        SingleOutputStreamOperator<UserRegisterBean> resultDS = javaBeanDS.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<UserRegisterBean>() {
                    @Override
                    public UserRegisterBean reduce(UserRegisterBean value1, UserRegisterBean value2) throws Exception {
                        value1.setRegisterCt(value1.getRegisterCt() + value2.getRegisterCt());
                        return value1;
                    }
                }, new AllWindowFunction<UserRegisterBean, UserRegisterBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<UserRegisterBean> values, Collector<UserRegisterBean> out) throws Exception {
                        UserRegisterBean next = values.iterator().next();
                        out.collect(MyWindowFunction.getJavaBeanField(next, window));
                    }
                });
        //todo 6 写出数据到clickHouse
        resultDS.print("resultDS>>>>>>>>>>>>>>");
        resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_user_user_register_window values(?,?,?,?)"));
        //todo 7 启动任务
        env.execute("Dws05_UserUserRegisterWindow");
    }
}
